1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16 package rx.internal.operators;
17
18 import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
19 import rx.Observable;
20 import rx.Scheduler;
21 import rx.Subscriber;
22 import rx.functions.Action0;
23 import rx.functions.Func2;
24 import rx.schedulers.Schedulers;
25 import rx.subscriptions.SerialSubscription;
26
27 public final class OperatorRetryWithPredicate<T> implements Observable.Operator<T, Observable<T>> {
28 final Func2<Integer, Throwable, Boolean> predicate;
29 public OperatorRetryWithPredicate(Func2<Integer, Throwable, Boolean> predicate) {
30 this.predicate = predicate;
31 }
32
33 @Override
34 public Subscriber<? super Observable<T>> call(final Subscriber<? super T> child) {
35 final Scheduler.Worker inner = Schedulers.trampoline().createWorker();
36 child.add(inner);
37
38 final SerialSubscription serialSubscription = new SerialSubscription();
39
40 child.add(serialSubscription);
41
42 return new SourceSubscriber<T>(child, predicate, inner, serialSubscription);
43 }
44
45 static final class SourceSubscriber<T> extends Subscriber<Observable<T>> {
46 final Subscriber<? super T> child;
47 final Func2<Integer, Throwable, Boolean> predicate;
48 final Scheduler.Worker inner;
49 final SerialSubscription serialSubscription;
50
51 volatile int attempts;
52 @SuppressWarnings("rawtypes")
53 static final AtomicIntegerFieldUpdater<SourceSubscriber> ATTEMPTS_UPDATER
54 = AtomicIntegerFieldUpdater.newUpdater(SourceSubscriber.class, "attempts");
55
56 public SourceSubscriber(Subscriber<? super T> child, final Func2<Integer, Throwable, Boolean> predicate, Scheduler.Worker inner,
57 SerialSubscription serialSubscription) {
58 this.child = child;
59 this.predicate = predicate;
60 this.inner = inner;
61 this.serialSubscription = serialSubscription;
62 }
63
64
65 @Override
66 public void onCompleted() {
67
68 }
69
70 @Override
71 public void onError(Throwable e) {
72 child.onError(e);
73 }
74
75 @Override
76 public void onNext(final Observable<T> o) {
77 inner.schedule(new Action0() {
78
79 @Override
80 public void call() {
81 final Action0 _self = this;
82 ATTEMPTS_UPDATER.incrementAndGet(SourceSubscriber.this);
83
84
85
86 Subscriber<T> subscriber = new Subscriber<T>() {
87 boolean done;
88 @Override
89 public void onCompleted() {
90 if (!done) {
91 done = true;
92 child.onCompleted();
93 }
94 }
95
96 @Override
97 public void onError(Throwable e) {
98 if (!done) {
99 done = true;
100 if (predicate.call(attempts, e) && !inner.isUnsubscribed()) {
101
102 inner.schedule(_self);
103 } else {
104
105 child.onError(e);
106 }
107 }
108 }
109
110 @Override
111 public void onNext(T v) {
112 if (!done) {
113 child.onNext(v);
114 }
115 }
116
117 };
118
119 serialSubscription.set(subscriber);
120 o.unsafeSubscribe(subscriber);
121 }
122 });
123 }
124 }
125 }